package cn.azzhu.day05

import cn.azzhu.day02.{SensorReading, SensorSource}
import org.apache.flink.api.common.functions.JoinFunction
import org.apache.flink.api.common.state.{ListStateDescriptor, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

/**
 * 状态变量-状态后端
 *
 * @author azzhu
 * @create 2020-09-21 20:33:28
 */
object ListStateExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //每隔10s做一次检查点操作
    env.enableCheckpointing(10000L)
    env.setStateBackend(new FsStateBackend(""))

    env.addSource(new SensorSource)
      .filter(_.id.equals("sensor_1"))
      .keyBy(_.id)
      .process(new Keyed)
      .print()

    env.execute("ListStateExample")
  }

  class Keyed extends KeyedProcessFunction[String,SensorReading,String] {
    lazy val listState = getRuntimeContext.getListState(
      new ListStateDescriptor[SensorReading]("list-state",Types.of[SensorReading])
    )

    lazy val timer = getRuntimeContext.getState(
      new ValueStateDescriptor[Long]("timer",Types.of[Long])
    )

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext,
                         out: Collector[String]): Unit =  {
      //不能直接对列表状态变量进行计算
      val readings:ListBuffer[SensorReading] = new ListBuffer()
      //隐式类型转换必须导入
      import scala.collection.JavaConversions._
      for (r <- listState.get()) {
        readings += r
      }
      out.collect("当前时刻列表状态变量中共有：" + readings.size + "条数据！！！")
      timer.clear()
    }

    override def processElement(value: SensorReading,
                                ctx: KeyedProcessFunction[String, SensorReading, String]#Context,
                                out: Collector[String]): Unit = {
      //value添加到列表状态中
      listState.add(value)
      if(timer.value() == 0L) {
        val ts = ctx.timerService().currentProcessingTime() + 10 * 1000L
        ctx.timerService().registerProcessingTimeTimer(ts)
        timer.update(ts)
      }
    }

  }
}
